Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support bigquery sink upsert #15780

Merged
merged 16 commits into from
Apr 26, 2024
Merged

feat(sink): support bigquery sink upsert #15780

merged 16 commits into from
Apr 26, 2024

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Mar 19, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

#14882
refactor bigquery with storage write api. And support bigquery sink upsert
https://cloud.google.com/bigquery/docs/write-api

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

In bigquery sink, Support upsert,
Users need to set corresponding permissions and pk based on the document in bigquery
https://cloud.google.com/bigquery/docs/change-data-capture?hl=zh-cn

@xxhZs xxhZs requested a review from a team as a code owner March 19, 2024 08:18
@xxhZs xxhZs changed the title feat(sink): support bigquery upsert feat(sink): support bigquery sink upsert Mar 19, 2024
@xxhZs xxhZs force-pushed the xxh/support-bigquery-cdc branch 2 times, most recently from 5cc2579 to 2b2402e Compare March 19, 2024 09:49
@xxhZs xxhZs requested review from hzxa21 and wenym1 March 19, 2024 09:49
@xxhZs xxhZs added the user-facing-changes Contains changes that are visible to users label Mar 19, 2024
@@ -364,18 +409,61 @@ fn encode_field<D: MaybeData>(
Ok(Value::Message(message.transcode_to_dynamic()))
})?
}
(false, Kind::String) if is_big_query => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the newly added custom_proto_type is only used to generate the is_big_query flag, and the flag is only used to control the logic when seeing different datatypes. If so, instead of adding this new parameter, we can just add new methods like on_timestamptz, on_jsonb ... to the MaybeData trait and then call the corresponding trait methods here. And then for bigquery we can implement its own MaybeData with new customized logic and then pass it to the encoder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's rw and proto type match , bigquery has some special matches (many of which are converted to string) that require is_big_query to determine whether the match holds or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added logic does not look like specially for bigquery. It's more like an extension of the original type compatibility and can be generalized for proto encoding used in sinks other than bigquery (cc @xiangjinwu ).

If so, we can remove the is_big_query flag and custom_proto_type and make it a general way of processing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it can be more general. It is better to introduce a TimestamptzHandlingMode and bigquery can just select one of the string formats, similar to the json encoder.

However it may not be another implementation of MaybeData (for bigquery). That trait is only meant to be implemented twice: once with type info alone (for validation), and once with concrete datum (for encoding). Given that we want to affect both validation and encoding here, it is supposed to be in encode_field here.

src/connector/src/sink/encoder/proto.rs Outdated Show resolved Hide resolved
@@ -364,18 +409,61 @@ fn encode_field<D: MaybeData>(
Ok(Value::Message(message.transcode_to_dynamic()))
})?
}
(false, Kind::String) if is_big_query => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added logic does not look like specially for bigquery. It's more like an extension of the original type compatibility and can be generalized for proto encoding used in sinks other than bigquery (cc @xiangjinwu ).

If so, we can remove the is_big_query flag and custom_proto_type and make it a general way of processing.

remove index

fix

fix
@xxhZs xxhZs requested a review from xiangjinwu April 2, 2024 04:08
@xxhZs xxhZs requested a review from wenym1 April 9, 2024 05:06
* Group C: experimental */
},
DataType::Int16 => match (expect_list, proto_field.kind()) {
(false, Kind::Int64) => maybe.on_base(|s| Ok(Value::I64(s.into_int16() as i64)))?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May also add support for casting to Int32 and Int16 by the way.

Cargo.lock Show resolved Hide resolved
src/connector/with_options_sink.yaml Outdated Show resolved Hide resolved
message_descriptor,
writer_pb_schema: ProtoSchema {
proto_descriptor: Some(descriptor_proto),
},
})
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this special append_only method? It is only a subset of upsert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One difference is that append-only does not add the CHANGE_TYPE column

src/connector/src/sink/big_query.rs Outdated Show resolved Hide resolved
src/connector/src/sink/big_query.rs Show resolved Hide resolved
src/connector/src/sink/big_query.rs Show resolved Hide resolved
fix
src/connector/src/sink/big_query.rs Show resolved Hide resolved
src/connector/src/sink/big_query.rs Show resolved Hide resolved
}

async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
if let Some(local_path) = &self.local_path {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not related to this PR: can we possibly not use separate local_path and s3_path and use a single option path? The path type can be distinguished by the path prefix such as file:// and s3://?

Besides, bigquery is on google cloud platform while users should upload the auth file to S3. It looks strange to me. Does gcp have S3 compatible object store?

Copy link

@docteurklein docteurklein Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, references to s3 or aws in general always confused me. Is there a gcp alternative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this file is necessary to connect to gcp, so it can't be saved at gcp, another alternative is to import the json from this file by means of the string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May provide an extra option for users to include the raw auth string in the options.

Can do it in a separate PR since the current PR is already large and this feature is independent to the current PR.

src/connector/src/sink/big_query.rs Outdated Show resolved Hide resolved
src/connector/src/sink/big_query.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM.

}

async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
if let Some(local_path) = &self.local_path {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May provide an extra option for users to include the raw auth string in the options.

Can do it in a separate PR since the current PR is already large and this feature is independent to the current PR.

@xxhZs xxhZs enabled auto-merge April 11, 2024 07:17
@xxhZs xxhZs disabled auto-merge April 11, 2024 07:17
checksum = "c48abc8687f4c4cc143dd5bd3da5f1d7ef38334e4af5cef6de4c39295c6a3fd0"
dependencies = [
"anyhow",
"arrow 50.0.0",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We introduce full arrow 50 dependencies here. We should verify whether it will greatly affect the compile time of DEBUG and RELEASE build. If yes, we may need to consider how to avoid this dependency given that we technically don't use arrow in google-cloud-bigquery.

Copy link
Contributor Author

@xxhZs xxhZs Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test with my computer

branch time target size build num
release main 27m 24s 27173092317 1471
release current pr 28m 14s 27275146355 1476
debug main 4m 31s 48776880726 1413
debug current pr 4m 51s 48920738495 1418

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result looks good. Thanks for the efforts.

@xxhZs xxhZs requested a review from hzxa21 April 19, 2024 03:05
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@xxhZs xxhZs enabled auto-merge April 25, 2024 05:51
@xxhZs xxhZs added this pull request to the merge queue Apr 25, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Apr 25, 2024
@xxhZs xxhZs enabled auto-merge April 25, 2024 06:47
@xxhZs xxhZs disabled auto-merge April 25, 2024 07:06
@xxhZs xxhZs enabled auto-merge April 26, 2024 04:15
@xxhZs xxhZs added this pull request to the merge queue Apr 26, 2024
Merged via the queue into main with commit 2dcd0f6 Apr 26, 2024
27 of 28 checks passed
@xxhZs xxhZs deleted the xxh/support-bigquery-cdc branch April 26, 2024 06:07
Comment on lines 23 to +24
use gcp_bigquery_client::Client;
use google_cloud_bigquery::grpc::apiv1::bigquery_client::StreamingWriteClient;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using 2 bigquery clients? Can we only use one? (The former gcp_bigquery_client is quite old now)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants